-
Notifications
You must be signed in to change notification settings - Fork 883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): support client side tracking (resp3 protocol only) #2233
Conversation
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
std::string tmp; | ||
std::string_view key = last_slot_it->first.GetSlice(&tmp); | ||
|
||
DbTable* table = db_slice_->GetDBTable(cntx_.db_index); | ||
PerformDeletion(last_slot_it, db_slice_->shard_owner(), table); | ||
++evicted_; | ||
db_slice_->SendInvalidationTrackingMessage(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the general case it's not safe to hold a key string_view after it was deleted, you can use GetString()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, actually I changed to use ToString()
which uses GetString()
inside...
PerformDeletion(it, shard_owner(), db_arr_[0].get()); | ||
SendInvalidationTrackingMessage(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there is some way to embed SendInvalidationTrackingMessage into PerformDeletion and PostUpdate? We call SendInvalidationTrackingMessage
10 times and I'm not sure that's all cases covered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally I would like to do it in PerformDeletion just like PostUpdate... i couldn't as it is not a member function and I need to access the member variable of db_slice (the tracking table) . one way is to define another PerformDeletion function as a member function of DbSlice...
client_tracking_map_.insert({k, tracker_set}); | ||
} else { | ||
std::pair<ConnectionContext*, int32_t> p{cntx, tid}; | ||
client_tracking_map_[k].insert(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Doesn't it work for the general case with [k]? I think that absl::flat_hash_set is default constructible
- You don't need k to be a string, you can use [string_view]
std::string k{key.begin(), key.end()}; | ||
if (client_tracking_map_.find(k) != client_tracking_map_.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, find should work with strings
src/server/db_slice.cc
Outdated
auto is_not_tracking = [](std::pair<ConnectionContext*, int32_t> p) { | ||
return (!p.first->conn()->IsTrackingOn()); | ||
}; | ||
absl::erase_if(client_set, is_not_tracking); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client should unregister itself from Service::OnClose()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently I'm relying on a lazy garbage collection of those closed clients/contexts in the table: when a key is updated, its entry in the tracking table will be removed, and therefore, all the clients that are/were tracking this key will be removed from the table.
It could be very costly to remove the client from tracking table especially when there are many keys are being tracked. (we have to iterate through the table to check if the client is involved there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then:
- You have to use Connection::WeakRef if you want to store the pointers indefinitely long
- You can only check IsTrackingOn on the destination thread (or you make this field atomic_bool, but then the pointer problem becomes more difficult)
- You can still accumulate lot's of outdated entries for rarely used keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then:
- You have to use Connection::WeakRef if you want to store the pointers indefinitely long
Yes, that's the plan: Use WeakRef to replace the raw pointer to the context.
- You can only check IsTrackingOn on the destination thread (or you make this field atomic_bool, but then the pointer problem becomes more difficult)
Yes, that's the current garbage collection logic in the implementation. As long as the WeakRef is a valid pointer, it will allow me to access the tracking flag which will be set to false in OnClose()
- You can still accumulate lot's of outdated entries for rarely used keys
That's true. Note that Redis currently uses a very similar way as well. Certainly one can implement another background garbage collection in heartbeat to mitigate this.
return; | ||
} | ||
}; | ||
shard_set->pool()->DispatchBrief(std::move(cb)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where you need to use the Connection::WeakRef
because DispatchBrief just queues in a task to be dispatched, the pointer can be invalidated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you meant iusing the weak reference nside the call back function cb
right?
src/server/main_service.cc
Outdated
// if this is a read command, and client tracking has enabled, | ||
// start tracking updates to the keys in this read command | ||
// notify the client when there is update, see PostUpdate() in db_slice.cc | ||
if ((cid->opt_mask() & CO::READONLY) && dfly_cntx->conn()->IsTrackingOn()) { | ||
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args_no_cmd); | ||
if (!key_index_res) | ||
return (*cntx)->SendError(key_index_res.status()); | ||
|
||
const auto& key_index = *key_index_res; | ||
vector<string_view> keys_to_track; | ||
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) { | ||
string_view key = ArgS(args_no_cmd, i); | ||
keys_to_track.push_back(key); | ||
} | ||
|
||
// let's pass thread id and connection to db_slice for tracking | ||
int32_t tid = util::ProactorBase::GetIndex(); | ||
|
||
// uint32_t client_id = dfly_cntx->conn()->GetClientId(); | ||
auto cb = [&](Transaction* t, EngineShard* shard) { | ||
return OpTrackKeys(t->GetOpArgs(shard), dfly_cntx, tid, keys_to_track, args); | ||
}; | ||
dfly_cntx->transaction->ScheduleSingleHopT(cb); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We currently don't allow simply re-using the transaction 🤔 Does it work reliably?
- We could be doing this from the transaction when it's concluding, so we don't need to perform a hop for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also commands that end with STORE have a bonus key: key_index.bonus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now i can say it doesn't work reliably especially when multi dispatch is running... and thanks again for the Refurbish()
function. That solves the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way 😅 you shouldn't be computing keys_to_track
, you can use t->GetShardArgs(shard->shard_id());
to get the keys inside the shard.
Each shard should track only it's keys, not the whole keys_to_track
array, because each shard is responsible only for it's non-intersecting subset of keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, fixed!
Signed-off-by: Yue Li <61070669+theyueli@users.noreply.github.com>
@dranikpg the implementation now is using weakref, could you help take a look again to see if they are used correctly? Tested by killing clients and sending invalidation messages to those clients. No more invalid pointers are reported in DF run time error. |
} | ||
|
||
Connection* Connection::WeakRef::Get() const { | ||
// DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dranikpg i had to comment this check... otherwise i'm running into the following segfault:
F20231203 03:55:48.743674 2493204 dragonfly_connection.cc:1395] Check failed: ProactorBase::me()->GetPoolIndex() == int(thread_) (8 vs. 0)
*** Check failure stack trace: ***
@ 0x559a589b7221 google::LogMessage::Fail()
@ 0x559a589b7167 google::LogMessage::SendToLog()
@ 0x559a589b693c google::LogMessage::Flush()
@ 0x559a589ba7b4 google::LogMessageFatal::~LogMessageFatal()
@ 0x559a5822077f facade::Connection::WeakRef::Get()
@ 0x559a57d4f0d7 dfly::DbSlice::TrackKeys()
@ 0x559a571398c8 dfly::OpTrackKeys()
@ 0x559a57139c44 _ZZN4dfly7Service15DispatchCommandEN4absl12lts_202308024SpanINS3_IcEEEEPN6facade17ConnectionContextEENKUlPNS_11TransactionEPNS_11EngineShardEE_clESA_SC_
@ 0x559a57165021 _ZZN4dfly11Transaction18ScheduleSingleHopTIRZNS_7Service15DispatchCommandEN4absl12lts_202308024SpanINS5_IcEEEEPN6facade17ConnectionContextEEUlPS0_PNS_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_ENKUlSB_SD_E_clESB_SD_
@ 0x559a571951c6 _ZSt13__invoke_implIN6facade8OpStatusERKZN4dfly11Transaction18ScheduleSingleHopTIRZNS2_7Service15DispatchCommandEN4absl12lts_202308024SpanINS8_IcEEEEPNS0_17ConnectionContextEEUlPS3_PNS2_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_JSD_SF_EESJ_St14__invoke_otherOT0_DpOT1_
@ 0x559a57188730 _ZSt8__invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt15__invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
@ 0x559a57180324 _ZSt6invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt13invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
@ 0x559a57177ce5 _ZN4absl12lts_2023080219functional_internal12InvokeObjectIZN4dfly11Transaction18ScheduleSingleHopTIRZNS3_7Service15DispatchCommandENS0_4SpanINS7_IcEEEEPN6facade17ConnectionContextEEUlPS4_PNS3_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_NSA_8OpStatusEJSD_SF_EEET0_NS1_7VoidPtrEDpNS1_8ForwardTIT1_E4typeE
@ 0x559a57e9f08b absl::lts_20230802::FunctionRef<>::operator()()
@ 0x559a57e71dc3 dfly::Transaction::RunQuickie()
@ 0x559a57e744c4 dfly::Transaction::ScheduleUniqueShard()
@ 0x559a57e68feb _ZZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS0_PNS_11EngineShardEEEEENKUlvE_clEv
@ 0x559a57e902e5 _ZSt13__invoke_implIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEET_St14__invoke_otherOT0_DpOT1_
@ 0x559a57e8e347 _ZSt10__invoke_rIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEENSt9enable_ifIX16is_invocable_r_vIT_T0_DpT1_EESF_E4typeEOSG_DpOSH_
@ 0x559a57e8c7fd _ZNSt17_Function_handlerIFvvEZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS2_PNS1_11EngineShardEEEEEUlvE_E9_M_invokeERKSt9_Any_data
@ 0x559a5728e48f std::function<>::operator()()
@ 0x559a588f556d util::fb2::FiberQueue::Run()
@ 0x559a578e710e _ZZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sENKUlvE0_clEv
@ 0x559a57901b8a _ZSt13__invoke_implIvZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEET_St14__invoke_otherOT0_DpOT1_
@ 0x559a578fee41 _ZSt8__invokeIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEENSt15__invoke_resultIT_JDpT0_EE4typeEOSA_DpOSB_
@ 0x559a578fc35d _ZSt12__apply_implIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEJEEDcOT_OT0_St16integer_sequenceImJXspT1_EEE
@ 0x559a578fc3da _ZSt5applyIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEEDcOT_OT0_
@ 0x559a578fc5ea _ZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEE4run_EON5boost7context5fiberE
@ 0x559a578f97d3 _ZZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSD_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSD_12preallocatedEOT_OS9_ENKUlONSD_5fiberEE_clESS_
@ 0x559a579091c2 _ZSt13__invoke_implIN5boost7context5fiberERZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS4_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4INS1_21basic_fixedsize_stackINS1_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNS1_12preallocatedEOT_OSD_EUlOS2_E_JS2_EESQ_St14__invoke_otherOT0_DpOT1_
@ 0x559a57907685 _ZSt8__invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt15__invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
@ 0x559a579054e2 _ZSt6invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt13invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
*** SIGABRT received at time=1701604548 on cpu 8 ***
PC: @ 0x7f72c3dc29fc (unknown) pthread_kill
@ 0x559a58a3c3fb 64 absl::lts_20230802::WriteFailureInfo()
@ 0x559a58a3c655 96 absl::lts_20230802::AbslFailureSignalHandler()
@ 0x7f72c3d6e520 (unknown) (unknown)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it was there for a reason. Will update the interface 🙂
return ptr_.lock().get(); | ||
} | ||
|
||
uint32_t Connection::WeakRef::GetClientId() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dranikpg added this feature to WeakRef as well.
I added 6f8af7d |
facade::Connection* conn = it->first.Get(); | ||
if (conn == nullptr) | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use IsExpired()
. And so we can keep the DCHECK inside GET()
fixes: #2139
this set of changes meant to provide minimal client tracking implementation for integrating with Relay.
note: this is still a draft... this branch meant to provide a prototype being used for testing with Relay team. We will divide these patches into different smaller production PRs after all the tests are passed with Relay.